Structured Logging
What You Will Learn
- How the Python
loggingmodule actually works internally - loggers, handlers, formatters, filters - Why
logging.basicConfig()is the wrong starting point for production - How to configure
structlogwith a production processor pipeline - How to inject correlation IDs into every log line using
contextvars - How to mask PII and sensitive data before logs leave the process
- How to ship logs to Loki and query them with LogQL
- How to make logging non-blocking with
QueueHandler+QueueListener
Prerequisites
| Requirement | Details |
|---|---|
| Python 3.11+ | contextvars and asyncio used |
| FastAPI | All examples use FastAPI middleware |
structlog, python-json-logger | pip install structlog python-json-logger |
| Docker | Loki + Promtail run in containers |
The Incident: Four Hours, One Log Line
Here is the exact configuration that cost a team four hours of debugging:
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# In the request handler:
logger.info(f"Processing order {order_id}")
try:
result = payment_service.charge(order_id, amount)
except Exception as e:
logger.error(f"Payment failed: {e}")
raise
Under load, the service started returning 500s. The logs looked like this:
INFO:app.orders:Processing order 8f3a2c
ERROR:app.orders:Payment failed: Connection timeout
INFO:app.orders:Processing order 9d1b4e
ERROR:app.orders:Payment failed: Connection timeout
INFO:app.orders:Processing order 2a7c91
ERROR:app.orders:Payment failed: Connection timeout
Questions the team could not answer from these logs:
- Which user triggered each order?
- Was it the same user hitting the same endpoint repeatedly?
- What was the
amountbeing charged? - Was this a pattern affecting one payment method or all?
- When exactly did the failures start?
- How many requests failed vs succeeded in the last 5 minutes?
The same event, logged with structlog, would produce:
{
"timestamp": "2026-03-07T09:14:32.445Z",
"level": "error",
"event": "payment.charge.failed",
"order_id": "8f3a2c",
"user_id": "usr_4492",
"amount_cents": 4999,
"payment_method": "stripe",
"currency": "USD",
"error": "Connection timeout",
"error_type": "requests.exceptions.ConnectTimeout",
"attempt": 1,
"request_id": "req_7e9d3b",
"service": "orders-api",
"version": "3.2.1",
"environment": "production"
}
A single Kibana query: error_type:"ConnectTimeout" AND payment_method:"stripe" answers every question in under two seconds.
1. Python Logging Architecture
The Logger Hierarchy
Python's logging module builds a tree of loggers rooted at the root logger. Every logger has a name. Names use dots as separators to imply hierarchy:
root
├── app
│ ├── app.api
│ │ ├── app.api.routes
│ │ └── app.api.middleware
│ ├── app.services
│ │ ├── app.services.payments
│ │ └── app.services.documents
│ └── app.db
└── uvicorn
├── uvicorn.access
└── uvicorn.error
import logging
# These are the same logger - both call getLogger("app.api.routes")
logger = logging.getLogger(__name__) # in module app/api/routes.py
logger = logging.getLogger("app.api.routes") # explicit name
# The root logger - avoid using this directly
root_logger = logging.getLogger() # or logging.root
Logger, Handler, Formatter, Filter
When you call logger.info("hello"), here is what happens internally:
logger.info("hello")
│
▼
Logger.info() creates a LogRecord
│
▼
Logger checks: is INFO >= effective level?
(effective level walks up the hierarchy until it finds a level set)
│ yes
▼
Logger passes LogRecord to each of its handlers
AND (if propagate=True) to parent's handlers, recursively up to root
│
▼
Handler checks: is INFO >= handler's own level filter?
│ yes
▼
Handler passes LogRecord through its Filters
│ all pass
▼
Handler passes LogRecord to its Formatter → formats into a string
│
▼
Handler emits the string (writes to file, stream, socket, etc.)
import logging
# Full manual setup - what basicConfig() does automatically but badly
handler = logging.StreamHandler()
handler.setLevel(logging.DEBUG)
formatter = logging.Formatter(
"%(asctime)s %(name)s %(levelname)s %(message)s"
)
handler.setFormatter(formatter)
logger = logging.getLogger("myapp")
logger.setLevel(logging.DEBUG)
logger.addHandler(handler)
logger.propagate = False # Don't send to root logger too
The propagate=True Trap
This is the most common logging misconfiguration in Python services:
import logging
# You configure your app logger carefully
app_logger = logging.getLogger("myapp")
app_logger.setLevel(logging.INFO)
app_logger.addHandler(my_json_handler)
# But root logger also has basicConfig() called somewhere (by a library)
logging.basicConfig() # Adds a StreamHandler to the root logger
# Now every log from "myapp" goes to:
# 1. my_json_handler (correct)
# 2. The root logger's StreamHandler (duplicate, unformatted)
# Because propagate=True by default!
Fix: Set propagate = False on your top-level application logger, or configure only the root logger and let propagation work for you - but never both.
2. Log Levels: When to Use Each
| Level | Numeric Value | When to Use |
|---|---|---|
DEBUG | 10 | Detailed diagnostic information useful during development. Database queries, function arguments, intermediate values. Never enabled in production by default. |
INFO | 20 | Confirmation that things are working as expected. Request received, task started, external API called. Normal operation. |
WARNING | 30 | Something unexpected happened, but the application can continue. Deprecated API called, fallback used, retry triggered. |
ERROR | 40 | A serious problem - the application could not perform some function. Exception caught, request failed, data could not be saved. |
CRITICAL | 50 | A very serious error - the application may be unable to continue. Database unreachable, configuration missing, OOM. |
The Right Level Discipline
import structlog
log = structlog.get_logger()
# DEBUG - never in production hot paths
log.debug("sql.query", sql=query, params=params, duration_ms=duration)
# INFO - normal business events
log.info("document.uploaded", document_id=doc_id, size_bytes=size)
# WARNING - degraded but functional
log.warning(
"cache.miss.fallback",
key=cache_key,
fallback="database",
reason="cache_unavailable",
)
# ERROR - something failed, needs attention
log.error(
"payment.charge.failed",
order_id=order_id,
amount_cents=amount,
error=str(exc),
exc_info=True, # Includes stack trace
)
# CRITICAL - the service is broken
log.critical(
"database.connection.lost",
host=db_host,
error=str(exc),
action="shutting_down",
)
Why DEBUG in Production Kills Throughput
Calling log.debug() with level set to DEBUG generates a LogRecord object, formats it, and writes it to a stream - even if no one is reading debug logs. At 10,000 requests/second, each with 5 debug log calls, that is 50,000 LogRecord object creations per second, 50,000 string format operations, and 50,000 stream writes. This can consume 15–30% of CPU in a Python service.
The structlog solution: Lazy evaluation.
import structlog
# structlog only processes the log entry if the level is enabled
# The lambda is never called if DEBUG is disabled
log.debug("processing", data=lambda: expensive_serialise(large_obj))
3. structlog - The Production Alternative
structlog is not a replacement for Python's logging module - it is a better front-end that still delegates to logging for output. The key insight is its processor pipeline: a list of callables that each receive and can modify the log event dict before it is rendered.
Installation
pip install structlog
The Processor Pipeline
import structlog
import logging
import sys
from datetime import datetime, timezone
def add_service_context(logger, method, event_dict):
"""Inject service-level fields into every log entry."""
event_dict["service"] = "document-api"
event_dict["version"] = "2.14.0"
event_dict["environment"] = "production"
return event_dict
structlog.configure(
processors=[
# 1. Add log level as a string
structlog.stdlib.add_log_level,
# 2. Add logger name (e.g., "app.api.routes")
structlog.stdlib.add_logger_name,
# 3. Add ISO 8601 timestamp
structlog.processors.TimeStamper(fmt="iso", utc=True),
# 4. Inject service metadata
add_service_context,
# 5. Render exceptions as dicts (not inline strings)
structlog.processors.format_exc_info,
# 6. Render stack info if present
structlog.processors.StackInfoRenderer(),
# 7. Merge any extra context added via bind_contextvars
structlog.contextvars.merge_contextvars,
# 8. Render as JSON for production
structlog.processors.JSONRenderer(),
],
wrapper_class=structlog.stdlib.BoundLogger,
context_class=dict,
logger_factory=structlog.stdlib.LoggerFactory(),
cache_logger_on_first_use=True,
)
# Usage
log = structlog.get_logger()
log.info("app.started", port=8001)
Output:
{"level": "info", "logger": "app", "timestamp": "2026-03-07T09:14:32.445Z", "service": "document-api", "version": "2.14.0", "environment": "production", "event": "app.started", "port": 8001}
Full Production Configuration for FastAPI
# app/logging_config.py
import logging
import logging.config
import sys
from typing import Any
import structlog
from structlog.types import EventDict, Processor
def _drop_color_message_key(
logger: Any, method: str, event_dict: EventDict
) -> EventDict:
"""Remove uvicorn's 'color_message' field - it is only for terminals."""
event_dict.pop("color_message", None)
return event_dict
def _rename_event_to_message(
logger: Any, method: str, event_dict: EventDict
) -> EventDict:
"""Rename structlog's 'event' key to 'message' for Datadog/Loki compatibility."""
# Keep both for now - some log platforms expect 'message'
event_dict["message"] = event_dict.get("event", "")
return event_dict
def setup_logging(
log_level: str = "INFO",
json_logs: bool = True,
service_name: str = "unknown",
service_version: str = "unknown",
environment: str = "development",
) -> None:
"""
Configure structlog + stdlib logging for production.
Call once at application startup, before any logging occurs.
"""
shared_processors: list[Processor] = [
structlog.contextvars.merge_contextvars,
structlog.stdlib.add_log_level,
structlog.stdlib.add_logger_name,
structlog.processors.TimeStamper(fmt="iso", utc=True),
structlog.stdlib.ExtraAdder(),
_drop_color_message_key,
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
]
if json_logs:
renderer: Processor = structlog.processors.JSONRenderer()
else:
renderer = structlog.dev.ConsoleRenderer(colors=True)
structlog.configure(
processors=shared_processors + [
# Prepare for stdlib logging integration
structlog.stdlib.ProcessorFormatter.wrap_for_formatter,
],
logger_factory=structlog.stdlib.LoggerFactory(),
wrapper_class=structlog.stdlib.BoundLogger,
cache_logger_on_first_use=True,
)
formatter = structlog.stdlib.ProcessorFormatter(
# Foreign pre-chain: processors for stdlib log records
foreign_pre_chain=shared_processors,
processors=[
# Extract the 'event' key and set on the record
structlog.stdlib.ProcessorFormatter.remove_processors_meta,
renderer,
],
)
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(formatter)
# Configure the root logger - all libraries inherit this
root_logger = logging.getLogger()
root_logger.handlers = [handler]
root_logger.setLevel(log_level.upper())
# Silence noisy libraries
for noisy_logger in ["httpx", "httpcore", "multipart"]:
logging.getLogger(noisy_logger).setLevel(logging.WARNING)
# Bind service-level context - appears in every log line
structlog.contextvars.clear_contextvars()
structlog.contextvars.bind_contextvars(
service=service_name,
version=service_version,
environment=environment,
)
Using structlog in Application Code
# app/api/routes/documents.py
import structlog
from fastapi import APIRouter, UploadFile, Depends
log = structlog.get_logger() # Module-level logger - cheap to create
router = APIRouter(prefix="/api/documents")
@router.post("/")
async def upload_document(
file: UploadFile,
current_user: User = Depends(get_current_user),
):
# Bind per-request context - all subsequent log calls in this
# coroutine will include these fields automatically
log.info(
"document.upload.received",
filename=file.filename,
content_type=file.content_type,
user_id=current_user.id,
)
try:
content = await file.read()
doc = await document_service.process(content, filename=file.filename)
log.info(
"document.upload.completed",
document_id=doc.id,
pages=doc.page_count,
duration_ms=doc.processing_time_ms,
)
return {"id": doc.id, "status": "processed"}
except ProcessingError as exc:
log.error(
"document.upload.failed",
filename=file.filename,
error=str(exc),
error_type=type(exc).__name__,
exc_info=True,
)
raise HTTPException(status_code=422, detail="Processing failed")
4. Correlation IDs
Without correlation IDs, log aggregation is nearly useless. If 100 concurrent requests are being processed, log lines from different requests are interleaved and you cannot reconstruct what happened for any single request.
A correlation ID (also called request_id or trace_id) is a unique identifier generated at the boundary of your system (usually the API gateway or the first service) and injected into every log line for that request.
contextvars: The Async-Safe Solution
Python's threading.local does not work with asyncio - different coroutines run on the same thread. contextvars.ContextVar is the correct solution: each coroutine gets its own copy of the variable.
# app/middleware/correlation_id.py
import uuid
from contextvars import ContextVar
from typing import Optional
import structlog
from fastapi import Request, Response
from starlette.middleware.base import BaseHTTPMiddleware
# Module-level ContextVar - one per process, but each coroutine gets its own value
_request_id_var: ContextVar[Optional[str]] = ContextVar(
"request_id", default=None
)
def get_request_id() -> Optional[str]:
return _request_id_var.get()
class CorrelationIdMiddleware(BaseHTTPMiddleware):
"""
Extracts or generates a request ID and binds it to structlog contextvars.
The request ID appears in every log line emitted during the request,
without any code in route handlers needing to pass it explicitly.
"""
async def dispatch(self, request: Request, call_next) -> Response:
# Honour upstream request ID (from API gateway, load balancer)
request_id = (
request.headers.get("X-Request-ID")
or request.headers.get("X-Correlation-ID")
or str(uuid.uuid4())
)
# Set in ContextVar for any code that needs it directly
token = _request_id_var.set(request_id)
# Bind to structlog contextvars - ALL log calls in this coroutine
# (and any coroutines it awaits) will include request_id
structlog.contextvars.bind_contextvars(
request_id=request_id,
http_method=request.method,
http_path=request.url.path,
)
try:
response = await call_next(request)
structlog.contextvars.bind_contextvars(
http_status=response.status_code,
)
# Echo the request ID back in the response for client-side correlation
response.headers["X-Request-ID"] = request_id
return response
finally:
# Critical: clear contextvars after the request completes
# Otherwise, a reused coroutine could leak request_id into the next request
structlog.contextvars.clear_contextvars()
_request_id_var.reset(token)
Registering the Middleware
# app/main.py
from fastapi import FastAPI
from app.middleware.correlation_id import CorrelationIdMiddleware
from app.logging_config import setup_logging
def create_app() -> FastAPI:
setup_logging(
log_level="INFO",
json_logs=True,
service_name="document-api",
service_version="2.14.0",
environment="production",
)
app = FastAPI(title="Document API")
# Correlation ID middleware must be first - it sets up the logging context
# that all subsequent middleware and route handlers rely on
app.add_middleware(CorrelationIdMiddleware)
return app
Log Output with Correlation IDs
All log lines for the same request share the same request_id:
{"timestamp": "2026-03-07T09:14:32.100Z", "level": "info", "event": "document.upload.received", "request_id": "req_7e9d3b", "http_method": "POST", "http_path": "/api/documents", "filename": "report.pdf", "user_id": "usr_4492"}
{"timestamp": "2026-03-07T09:14:32.234Z", "level": "info", "event": "document.ocr.started", "request_id": "req_7e9d3b", "pages": 12, "engine": "tesseract"}
{"timestamp": "2026-03-07T09:14:33.891Z", "level": "info", "event": "document.upload.completed", "request_id": "req_7e9d3b", "document_id": "doc_8f3a", "duration_ms": 1791}
In Kibana or Loki, filtering by request_id: "req_7e9d3b" reconstructs the entire request lifecycle instantly, across any number of log lines.
5. JSON Logging
For production, log lines must be machine-parseable JSON. Human-readable logs are fine for local development but impossible to query at scale.
structlog JSONRenderer
The JSONRenderer processor (shown in the configuration above) handles this. It serialises the event dict to a single-line JSON string using json.dumps().
For special types that are not JSON-serialisable, provide a custom default serialiser:
import json
import uuid
from datetime import datetime, date
from decimal import Decimal
from enum import Enum
def _json_serialiser(obj):
"""Handle types that json.dumps cannot serialise by default."""
if isinstance(obj, datetime):
return obj.isoformat()
if isinstance(obj, date):
return obj.isoformat()
if isinstance(obj, uuid.UUID):
return str(obj)
if isinstance(obj, Decimal):
return float(obj)
if isinstance(obj, Enum):
return obj.value
if isinstance(obj, bytes):
return obj.decode("utf-8", errors="replace")
raise TypeError(f"Object of type {type(obj).__name__} is not JSON serialisable")
# Use in structlog configuration:
structlog.processors.JSONRenderer(serializer=json.dumps, default=_json_serialiser)
Standard Fields
Establish a standard set of fields that appear in every log line. This makes querying consistent across services:
| Field | Type | Example | Purpose |
|---|---|---|---|
timestamp | ISO 8601 string | 2026-03-07T09:14:32.445Z | Time-based queries |
level | string | info, error | Severity filtering |
event | string | document.upload.failed | Event type - use dot notation |
service | string | document-api | Which service logged this |
version | string | 2.14.0 | Which version was running |
environment | string | production | Which environment |
request_id | string | req_7e9d3b | Correlation across log lines |
user_id | string | usr_4492 | Correlate to user actions |
error | string | Connection timeout | Error message |
error_type | string | ConnectTimeout | Exception class name |
python-json-logger for stdlib Compatibility
If you need stdlib logging compatibility (e.g., third-party libraries that use logger.error()), use python-json-logger:
# pip install python-json-logger
from pythonjsonlogger import jsonlogger
class CustomJsonFormatter(jsonlogger.JsonFormatter):
def add_fields(self, log_record, record, message_dict):
super().add_fields(log_record, record, message_dict)
log_record["service"] = "document-api"
log_record["environment"] = "production"
# Rename 'message' → 'event' for consistency with structlog
if "message" in log_record:
log_record["event"] = log_record.pop("message")
handler = logging.StreamHandler()
handler.setFormatter(CustomJsonFormatter("%(asctime)s %(name)s %(levelname)s %(message)s"))
logging.getLogger().addHandler(handler)
6. Sensitive Data Masking
Logs will contain sensitive data. It is a matter of when, not if. A developer adds log.debug("user.login", user=user_dict) and suddenly passwords, API keys, and email addresses are in your log aggregator - which has far weaker access controls than your application database.
Intercept this at the structlog processor level.
Field-Level Masking Processor
# app/logging/masking.py
import re
from typing import Any
from structlog.types import EventDict
# Fields that should always be masked entirely
SENSITIVE_FIELDS = frozenset({
"password",
"password_hash",
"secret",
"api_key",
"token",
"access_token",
"refresh_token",
"authorization",
"credit_card",
"card_number",
"cvv",
"ssn",
"private_key",
})
# Patterns for masking in string values
_CARD_PATTERN = re.compile(r"\b(?:\d[ -]?){13,16}\b")
_EMAIL_PATTERN = re.compile(r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b")
_JWT_PATTERN = re.compile(r"\beyJ[A-Za-z0-9_-]+\.[A-Za-z0-9_-]+\.[A-Za-z0-9_-]+\b")
_AWS_KEY_PATTERN = re.compile(r"\bAKIA[0-9A-Z]{16}\b")
def _mask_string(value: str) -> str:
"""Apply regex masking to a string value."""
value = _CARD_PATTERN.sub("[CARD_MASKED]", value)
value = _JWT_PATTERN.sub("[JWT_MASKED]", value)
value = _AWS_KEY_PATTERN.sub("[AWS_KEY_MASKED]", value)
# Partial masking for emails: keep domain, mask local part
def mask_email(m: re.Match) -> str:
local, domain = m.group(0).rsplit("@", 1)
masked_local = local[0] + "***" if local else "***"
return f"{masked_local}@{domain}"
value = _EMAIL_PATTERN.sub(mask_email, value)
return value
def _mask_value(key: str, value: Any, depth: int = 0) -> Any:
"""Recursively mask sensitive values."""
if depth > 5: # Prevent infinite recursion on circular structures
return value
# Mask entire field if the key is sensitive
if isinstance(key, str) and key.lower() in SENSITIVE_FIELDS:
return "[MASKED]"
if isinstance(value, str):
return _mask_string(value)
if isinstance(value, dict):
return {
k: _mask_value(k, v, depth + 1)
for k, v in value.items()
}
if isinstance(value, (list, tuple)):
masked = [_mask_value(str(i), item, depth + 1) for i, item in enumerate(value)]
return type(value)(masked)
return value
def mask_sensitive_data(logger, method: str, event_dict: EventDict) -> EventDict:
"""
structlog processor that masks PII and secrets before log output.
Place this LAST before the renderer so all processors have had a chance
to add their fields, but no sensitive data reaches the output.
"""
return {
key: _mask_value(key, value)
for key, value in event_dict.items()
}
Integrating the Masking Processor
# In setup_logging(), add to the processors list, just before the renderer:
structlog.configure(
processors=[
structlog.contextvars.merge_contextvars,
structlog.stdlib.add_log_level,
structlog.stdlib.add_logger_name,
structlog.processors.TimeStamper(fmt="iso", utc=True),
structlog.processors.format_exc_info,
# Masking runs after all context is assembled, before rendering
mask_sensitive_data,
structlog.processors.JSONRenderer(),
],
...
)
Test the Masking
log.info(
"user.login",
password="hunter2", # Must be masked
api_key="sk-abc123def456", # Must be masked
user_data={
"name": "Alice",
"token": "eyJhbGciOi...", # Must be masked
"payment": {
"card_number": "4111 1111 1111 1111", # Must be masked
},
},
)
Expected output:
{
"event": "user.login",
"email": "a***@example.com",
"password": "[MASKED]",
"api_key": "[MASKED]",
"user_data": {
"name": "Alice",
"token": "[MASKED]",
"payment": {
"card_number": "[CARD_MASKED]"
}
}
}
7. Log Aggregation with Loki
Loki is Grafana's log aggregation system. Unlike Elasticsearch, it does not index log content - it indexes only labels (similar to Prometheus). This makes it very cheap to run.
docker-compose Setup: Python Service + Loki + Promtail + Grafana
# docker-compose.yml
version: "3.9"
services:
app:
build: .
ports:
- "8001:8001"
environment:
- LOG_LEVEL=INFO
- LOG_FORMAT=json
logging:
driver: "json-file"
options:
max-size: "10m"
max-file: "3"
tag: "{{.Name}}"
loki:
image: grafana/loki:2.9.4
ports:
- "3100:3100"
command: -config.file=/etc/loki/local-config.yaml
volumes:
- ./config/loki.yaml:/etc/loki/local-config.yaml
- loki_data:/loki
promtail:
image: grafana/promtail:2.9.4
volumes:
- /var/lib/docker/containers:/var/lib/docker/containers:ro
- /var/run/docker.sock:/var/run/docker.sock
- ./config/promtail.yaml:/etc/promtail/config.yaml
command: -config.file=/etc/promtail/config.yaml
grafana:
image: grafana/grafana:10.3.3
ports:
- "3000:3000"
environment:
- GF_SECURITY_ADMIN_PASSWORD=admin
- GF_AUTH_ANONYMOUS_ENABLED=true
volumes:
- grafana_data:/var/lib/grafana
- ./config/grafana-datasources.yaml:/etc/grafana/provisioning/datasources/datasources.yaml
volumes:
loki_data:
grafana_data:
Promtail Configuration
# config/promtail.yaml
server:
http_listen_port: 9080
grpc_listen_port: 0
positions:
filename: /tmp/positions.yaml
clients:
- url: http://loki:3100/loki/api/v1/push
scrape_configs:
- job_name: docker
docker_sd_configs:
- host: unix:///var/run/docker.sock
refresh_interval: 5s
relabel_configs:
# Use container name as the 'container' label in Loki
- source_labels: ["__meta_docker_container_name"]
target_label: container
regex: "/(.*)"
# Use the Docker log tag (set to container name above)
- source_labels: ["__meta_docker_container_log_stream"]
target_label: stream
pipeline_stages:
# Parse JSON log lines - extract fields as Loki labels
- json:
expressions:
level: level
service: service
environment: environment
request_id: request_id
# Promote extracted fields to Loki labels (for fast filtering)
- labels:
level:
service:
environment:
# Use the timestamp from the log line itself
- timestamp:
source: timestamp
format: RFC3339Nano
LogQL Queries
LogQL is Loki's query language. It looks like PromQL but operates on log streams:
# All error logs from the document-api in the last hour
{service="document-api", level="error"} | last 1h
# Count error rate per minute, grouped by service
sum by (service) (
rate({level="error"}[1m])
)
# Find all logs for a specific request
{service="document-api"} | json | request_id="req_7e9d3b"
# Find slow requests (duration > 2000ms) - requires numeric extraction
{service="document-api"} | json | duration_ms > 2000
# Find payment failures, extract and show relevant fields
{service="orders-api"}
| json
| event="payment.charge.failed"
| line_format "{{.user_id}} {{.amount_cents}} {{.error}}"
# Alert query: error rate > 1% of total requests
(
sum(rate({service="document-api", level="error"}[5m]))
/
sum(rate({service="document-api"}[5m]))
) > 0.01
8. Non-Blocking Async Logging
File and network I/O in logging handlers is synchronous. In a high-throughput FastAPI service, synchronous log writes can add 1–5ms of latency per request, and under load they can become a bottleneck.
QueueHandler + QueueListener
Python's logging.handlers.QueueHandler puts log records onto a queue.Queue. A separate QueueListener thread drains the queue and passes records to the real handler. The main thread never blocks on I/O.
# app/logging/async_handler.py
import logging
import logging.handlers
import queue
import threading
from typing import Optional
class NonBlockingLogSetup:
"""
Sets up a QueueHandler → QueueListener → actual_handler pipeline.
The QueueHandler is added to the root logger. All log calls are
O(1) - they just put a record on the queue. The QueueListener
runs in a daemon thread and does the actual I/O.
"""
def __init__(self, handler: logging.Handler, maxsize: int = 10_000):
self._queue: queue.Queue = queue.Queue(maxsize=maxsize)
self._handler = handler
self._queue_handler = logging.handlers.QueueHandler(self._queue)
self._listener: Optional[logging.handlers.QueueListener] = None
def start(self) -> None:
"""Call once at application startup."""
self._listener = logging.handlers.QueueListener(
self._queue,
self._handler,
respect_handler_level=True,
)
self._listener.start()
root = logging.getLogger()
root.handlers = [self._queue_handler]
def stop(self) -> None:
"""Call at application shutdown - flushes remaining records."""
if self._listener:
self._listener.stop()
@property
def queue_size(self) -> int:
return self._queue.qsize()
@property
def queue_full(self) -> bool:
return self._queue.full()
Integration with FastAPI Lifespan
# app/main.py
from contextlib import asynccontextmanager
from fastapi import FastAPI
from app.logging.async_handler import NonBlockingLogSetup
from app.logging_config import setup_logging
import structlog
import sys
import logging
log = structlog.get_logger()
_log_setup: NonBlockingLogSetup = None
@asynccontextmanager
async def lifespan(app: FastAPI):
global _log_setup
# Set up the actual JSON handler
json_handler = logging.StreamHandler(sys.stdout)
json_handler.setFormatter(get_structlog_formatter()) # your formatter
# Wrap it in a non-blocking queue handler
_log_setup = NonBlockingLogSetup(json_handler, maxsize=50_000)
_log_setup.start()
log.info("app.started", log_mode="async_queue")
yield
# Flush all remaining log records before shutdown
log.info("app.stopping", pending_log_records=_log_setup.queue_size)
_log_setup.stop()
app = FastAPI(lifespan=lifespan)
Measuring the Difference
# benchmark/logging_benchmark.py
import asyncio
import time
import logging
import logging.handlers
import queue
import sys
N = 100_000
async def bench_sync_logging():
"""Synchronous StreamHandler - blocks on each write."""
handler = logging.StreamHandler(open("/dev/null", "w"))
logger = logging.getLogger("sync_bench")
logger.handlers = [handler]
logger.setLevel(logging.INFO)
start = time.perf_counter()
for i in range(N):
logger.info("benchmark event %d", i)
elapsed = time.perf_counter() - start
print(f"Sync: {N} logs in {elapsed:.3f}s = {N/elapsed:.0f} logs/sec")
async def bench_async_logging():
"""QueueHandler - non-blocking, returns immediately."""
q = queue.Queue(maxsize=200_000)
actual_handler = logging.StreamHandler(open("/dev/null", "w"))
queue_handler = logging.handlers.QueueHandler(q)
listener = logging.handlers.QueueListener(q, actual_handler)
listener.start()
logger = logging.getLogger("async_bench")
logger.handlers = [queue_handler]
logger.setLevel(logging.INFO)
start = time.perf_counter()
for i in range(N):
logger.info("benchmark event %d", i)
elapsed = time.perf_counter() - start
listener.stop()
print(f"Async: {N} logs in {elapsed:.3f}s = {N/elapsed:.0f} logs/sec")
asyncio.run(bench_sync_logging())
asyncio.run(bench_async_logging())
Typical results on a modern machine:
| Mode | Throughput | Notes |
|---|---|---|
| Synchronous StreamHandler | ~80,000 logs/sec | Blocks on each write() syscall |
| QueueHandler (non-blocking) | ~600,000 logs/sec | Only touches a Python queue |
| Difference | 7.5x faster | More dramatic with network handlers |
For a network handler (sending to Loki directly), the difference is 50–100x.
Complete Production Logging Module
# app/logging_config.py - drop this into any FastAPI service
"""
Production logging configuration.
Usage:
from app.logging_config import setup_logging
setup_logging() # Call once at startup
import structlog
log = structlog.get_logger()
log.info("my.event", key="value")
"""
import logging
import logging.handlers
import queue
import sys
from typing import Optional
import structlog
from structlog.types import EventDict
from app.logging.masking import mask_sensitive_data
def _drop_color_message_key(logger, method: str, event_dict: EventDict) -> EventDict:
event_dict.pop("color_message", None)
return event_dict
def _add_open_telemetry_spans(logger, method: str, event_dict: EventDict) -> EventDict:
"""
Inject current OpenTelemetry trace/span IDs into the log record.
This enables jumping from a log line to the corresponding trace in Jaeger.
Covered in Lesson 03.
"""
try:
from opentelemetry import trace
span = trace.get_current_span()
if span.is_recording():
ctx = span.get_span_context()
event_dict["trace_id"] = format(ctx.trace_id, "032x")
event_dict["span_id"] = format(ctx.span_id, "016x")
except ImportError:
pass
return event_dict
def get_processors(json_logs: bool) -> list:
processors = [
structlog.contextvars.merge_contextvars,
structlog.stdlib.add_log_level,
structlog.stdlib.add_logger_name,
structlog.processors.TimeStamper(fmt="iso", utc=True),
structlog.stdlib.ExtraAdder(),
_drop_color_message_key,
_add_open_telemetry_spans,
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
mask_sensitive_data,
]
if json_logs:
processors.append(structlog.processors.JSONRenderer())
else:
processors.append(structlog.dev.ConsoleRenderer(colors=True))
return processors
def setup_logging(
log_level: str = "INFO",
json_logs: bool = True,
service_name: str = "unknown",
service_version: str = "unknown",
environment: str = "development",
async_queue: bool = True,
queue_maxsize: int = 50_000,
) -> Optional["NonBlockingLogSetup"]:
"""Configure structlog + stdlib logging for production."""
processors = get_processors(json_logs)
structlog.configure(
processors=processors[:-1] + [
structlog.stdlib.ProcessorFormatter.wrap_for_formatter,
],
logger_factory=structlog.stdlib.LoggerFactory(),
wrapper_class=structlog.stdlib.BoundLogger,
cache_logger_on_first_use=True,
)
formatter = structlog.stdlib.ProcessorFormatter(
foreign_pre_chain=processors[:-1],
processors=[
structlog.stdlib.ProcessorFormatter.remove_processors_meta,
processors[-1], # The renderer
],
)
stream_handler = logging.StreamHandler(sys.stdout)
stream_handler.setFormatter(formatter)
root = logging.getLogger()
root.setLevel(log_level.upper())
log_setup = None
if async_queue:
from app.logging.async_handler import NonBlockingLogSetup
log_setup = NonBlockingLogSetup(stream_handler, maxsize=queue_maxsize)
log_setup.start()
else:
root.handlers = [stream_handler]
# Silence noisy libraries
for noisy in ["httpx", "httpcore", "multipart", "botocore", "boto3"]:
logging.getLogger(noisy).setLevel(logging.WARNING)
# Bind service context to all log lines
structlog.contextvars.bind_contextvars(
service=service_name,
version=service_version,
environment=environment,
)
return log_setup
Interview Questions and Answers
Q1: What is the difference between logger.propagate = False and logger.setLevel(logging.CRITICAL) for silencing a logger?
Setting logger.setLevel(logging.CRITICAL) prevents the logger from processing any records below CRITICAL - no records are created and no handlers are called. Setting logger.propagate = False prevents records from being passed to parent handlers, but the logger's own handlers still receive and process them. They solve different problems: use setLevel to completely silence a logger, use propagate = False to prevent double-logging when you have handlers on both a child logger and a parent.
Q2: Why is contextvars.ContextVar necessary for correlation IDs in async Python, and why does threading.local fail?
In asyncio, multiple coroutines run on the same OS thread. threading.local stores values per-thread, so all concurrent coroutines on the same event loop thread share the same threading.local value - they would see each other's request_id. ContextVar stores values per-Context, and each coroutine gets a copy of the context it was spawned from when asyncio.create_task() is called. This means each coroutine has its own isolated request_id even though they share a thread.
Q3: A teammate adds log.debug("sql.query", sql=query, params=params) inside a tight loop. The service is in production with LOG_LEVEL=INFO. Is there a performance problem?
With Python's stdlib logging, there is a small but non-zero cost because logger.debug() must check whether DEBUG is enabled. With structlog using cache_logger_on_first_use=True, the pipeline is cached and the level check is very fast. The real danger is if the teammate uses lazy evaluation incorrectly - log.debug("sql", sql=f"SELECT {' '.join(rows)}") will still compute the f-string even if DEBUG is disabled. The correct pattern is log.debug("sql", sql=lambda: expensive_format(query)). In structlog, the lambda is only called if the level passes.
Q4: How do you prevent a structlog processor from leaking PII if an exception's args tuple contains sensitive data?
The format_exc_info processor formats the exception including its str() representation, which includes args. To prevent leakage, you need a processor that runs after format_exc_info and scrubs the formatted exception string, or you need a processor that runs before it and sanitises the exception args. The cleanest approach is the before_send hook in Sentry (Lesson 04) - at the log layer, you intercept the exception key in the event dict after format_exc_info and apply your regex masking to the formatted traceback string.
Q5: Your service is logging 50,000 lines per second. Log storage costs are out of control. What strategies reduce cost without losing observability?
Several complementary approaches: (1) Log level discipline - ensure INFO logs are only true business events, not debug noise. Audit all log.info() calls and demote noise to DEBUG. (2) Sampling - log only 1 in 100 successful requests at INFO; log all errors and warnings. (3) Log deduplication - use structlog's BoundLogger to log "batch completed: 1000 items" instead of one line per item. (4) Metrics for frequency - if you are logging a counter event (cache miss, rate limit hit), replace it with a Prometheus counter and log only on the first occurrence. (5) Short retention - move from 30-day to 7-day retention for INFO/DEBUG; keep ERROR/CRITICAL for 90 days. (6) Log levels per service - set noisier services to WARNING in production, with the ability to switch to DEBUG temporarily via a config reload without a redeploy.
